home *** CD-ROM | disk | FTP | other *** search
/ Complete Linux / Complete Linux.iso / docs / apps / database / postgres / postgre4.z / postgre4 / src / access / index-rtree / rtree.c < prev    next >
Encoding:
C/C++ Source or Header  |  1992-08-27  |  23.1 KB  |  874 lines

  1. /*
  2.  *  rtree.c -- interface routines for the postgres rtree indexed access
  3.  *       method.
  4.  */
  5. #include "tmp/c.h"
  6. #include "tmp/postgres.h"
  7.  
  8. #include "storage/bufmgr.h"
  9. #include "storage/bufpage.h"
  10. #include "storage/page.h"
  11.  
  12. #include "utils/log.h"
  13. #include "utils/rel.h"
  14. #include "utils/excid.h"
  15.  
  16. #include "access/heapam.h"
  17. #include "access/genam.h"
  18. #include "access/ftup.h"
  19. #include "access/rtree.h"
  20. #include "access/funcindex.h"
  21.  
  22. #include "nodes/execnodes.h"
  23. #include "nodes/plannodes.h"
  24.  
  25. #include "executor/x_qual.h"
  26. #include "executor/x_tuples.h"
  27. #include "executor/tuptable.h"
  28.  
  29. #include "lib/index.h"
  30.  
  31. extern ExprContext RMakeExprContext();
  32.  
  33. RcsId("$Header: /private/postgres/src/access/index-rtree/RCS/rtree.c,v 1.18 1992/08/24 23:14:23 mao Exp $");
  34.  
  35. extern InsertIndexResult    rtdoinsert();
  36. extern InsertIndexResult    dosplit();
  37. extern int                nospace();
  38.  
  39. typedef struct SPLITVEC {
  40.     OffsetNumber    *spl_left;
  41.     int            spl_nleft;
  42.     char        *spl_ldatum;
  43.     OffsetNumber    *spl_right;
  44.     int            spl_nright;
  45.     char        *spl_rdatum;
  46. } SPLITVEC;
  47.  
  48. void
  49. rtbuild(heap, index, natts, attnum, istrat, pcount, params, finfo, pred)
  50.     Relation heap;
  51.     Relation index;
  52.     AttributeNumber natts;
  53.     AttributeNumber *attnum;
  54.     IndexStrategy istrat;
  55.     uint16 pcount;
  56.     Datum *params;
  57.     FuncIndexInfo *finfo;
  58.     LispValue pred;
  59. {
  60.     HeapScanDesc scan;
  61.     Buffer buffer;
  62.     AttributeNumber i;
  63.     HeapTuple htup;
  64.     IndexTuple itup;
  65.     TupleDescriptor hd, id;
  66.     InsertIndexResult res;
  67.     Datum *d;
  68.     Boolean *nulls;
  69.     int nb, nh, ni;
  70.     ExprContext econtext;
  71.     TupleTable tupleTable;
  72.     TupleTableSlot slot;
  73.     ObjectId hrelid, irelid;
  74.  
  75.     /* rtrees only know how to do stupid locking now */
  76.     RelationSetLockForWrite(index);
  77.  
  78.     /*
  79.      *  We expect to be called exactly once for any index relation.
  80.      *  If that's not the case, big trouble's what we have.
  81.      */
  82.  
  83.     if ((nb = RelationGetNumberOfBlocks(index)) != 0)
  84.     elog(WARN, "%.16s already contains data", index->rd_rel->relname);
  85.  
  86.     /* initialize the root page */
  87.     buffer = ReadBuffer(index, P_NEW);
  88.     RTInitBuffer(buffer, F_LEAF);
  89.     WriteBuffer(buffer);
  90.  
  91.     /* init the tuple descriptors and get set for a heap scan */
  92.     hd = RelationGetTupleDescriptor(heap);
  93.     id = RelationGetTupleDescriptor(index);
  94.     d = LintCast(Datum *, palloc(natts * sizeof (*d)));
  95.     nulls = LintCast(Boolean *, palloc(natts * sizeof (*nulls)));
  96.  
  97.     /*
  98.      * If this is a predicate (partial) index, we will need to evaluate the
  99.      * predicate using ExecQual, which requires the current tuple to be in a
  100.      * slot of a TupleTable.  In addition, ExecQual must have an ExprContext
  101.      * referring to that slot.  Here, we initialize dummy TupleTable and
  102.      * ExprContext objects for this purpose. --Nels, Feb '92
  103.      */
  104.     if (pred != LispNil) {
  105.     tupleTable = ExecCreateTupleTable(1);
  106.     slot = (TupleTableSlot)
  107.         ExecGetTableSlot(tupleTable, ExecAllocTableSlot(tupleTable));
  108.     econtext = RMakeExprContext();
  109.     FillDummyExprContext(econtext, slot, hd, buffer);
  110.     }
  111.  
  112.     scan = heap_beginscan(heap, 0, NowTimeQual, 0, (ScanKey) NULL);
  113.     htup = heap_getnext(scan, 0, &buffer);
  114.  
  115.     /* count the tuples as we insert them */
  116.     nh = ni = 0;
  117.  
  118.     for (; HeapTupleIsValid(htup); htup = heap_getnext(scan, 0, &buffer)) {
  119.  
  120.     nh++;
  121.  
  122.     /* Skip this tuple if it doesn't satisfy the partial-index predicate */
  123.     if (pred != LispNil) {
  124.         SetSlotContents(slot, htup);
  125.         if (ExecQual(pred, econtext) == false)
  126.         continue;
  127.     }
  128.  
  129.     ni++;
  130.  
  131.     /*
  132.      *  For the current heap tuple, extract all the attributes
  133.      *  we use in this index, and note which are null.
  134.      */
  135.  
  136.     for (i = 1; i <= natts; i++) {
  137.         AttributeOffset attoff;
  138.         Boolean attnull;
  139.  
  140.         /*
  141.          *  Offsets are from the start of the tuple, and are
  142.          *  zero-based; indices are one-based.  The next call
  143.          *  returns i - 1.  That's data hiding for you.
  144.          */
  145.  
  146.         attoff = AttributeNumberGetAttributeOffset(i);
  147.         /*
  148.         d[attoff] = HeapTupleGetAttributeValue(htup, buffer,
  149.         */
  150.         d[attoff] = GetIndexValue(htup, 
  151.                       hd,
  152.                       attoff, 
  153.                       attnum, 
  154.                       finfo, 
  155.                       &attnull,
  156.                       buffer);
  157.         nulls[attoff] = (attnull ? 'n' : ' ');
  158.     }
  159.  
  160.     /* form an index tuple and point it at the heap tuple */
  161.     itup = FormIndexTuple(natts, id, &d[0], nulls);
  162.     itup->t_tid = htup->t_ctid;
  163.  
  164.     /*
  165.      *  Since we already have the index relation locked, we
  166.      *  call rtdoinsert directly.  Normal access method calls
  167.      *  dispatch through rtinsert, which locks the relation
  168.      *  for write.  This is the right thing to do if you're
  169.      *  inserting single tups, but not when you're initializing
  170.      *  the whole index at once.
  171.      */
  172.  
  173.     res = rtdoinsert(index, itup);
  174.     pfree(itup);
  175.     pfree(res);
  176.     }
  177.  
  178.     /* okay, all heap tuples are indexed */
  179.     heap_endscan(scan);
  180.     RelationUnsetLockForWrite(index);
  181.  
  182.     if (pred != LispNil) {
  183.     ExecDestroyTupleTable(tupleTable, false);
  184.     }
  185.  
  186.     /*
  187.      *  Since we just counted the tuples in the heap, we update its
  188.      *  stats in pg_relation to guarantee that the planner takes
  189.      *  advantage of the index we just created.  UpdateStats() does a
  190.      *  CommandCounterIncrement(), which flushes changed entries from
  191.      *  the system relcache.  The act of constructing an index changes
  192.      *  these heap and index tuples in the system catalogs, so they
  193.      *  need to be flushed.  We close them to guarantee that they
  194.      *  will be.
  195.      */
  196.  
  197.     hrelid = heap->rd_id;
  198.     irelid = index->rd_id;
  199.     heap_close(heap);
  200.     index_close(index);
  201.  
  202.     UpdateStats(hrelid, nh, true);
  203.     UpdateStats(irelid, ni, false);
  204.  
  205.     /* be tidy */
  206.     pfree(nulls);
  207.     pfree(d);
  208. }
  209.  
  210. /*
  211.  *  rtinsert -- wrapper for rtree tuple insertion.
  212.  *
  213.  *    This is the public interface routine for tuple insertion in rtrees.
  214.  *    It doesn't do any work; just locks the relation and passes the buck.
  215.  */
  216.  
  217. InsertIndexResult
  218. rtinsert(r, itup)
  219.     Relation r;
  220.     IndexTuple itup;
  221. {
  222.     InsertIndexResult res;
  223.  
  224.     RelationSetLockForWrite(r);
  225.     res = rtdoinsert(r, itup);
  226.  
  227.     /* XXX two-phase locking -- don't unlock the relation until EOT */
  228.     return (res);
  229. }
  230.  
  231. InsertIndexResult
  232. rtdoinsert(r, itup)
  233.     Relation r;
  234.     IndexTuple itup;
  235. {
  236.     Page page;
  237.     Buffer buffer;
  238.     BlockNumber blk;
  239.     IndexTuple which;
  240.     OffsetNumber l;
  241.     RTSTACK *stack;
  242.     InsertIndexResult res;
  243.     RTreePageOpaque opaque;
  244.     char *datum;
  245.  
  246.     blk = P_ROOT;
  247.     buffer = InvalidBuffer;
  248.     stack = (RTSTACK *) NULL;
  249.  
  250.     do {
  251.     /* let go of current buffer before getting next */
  252.     if (buffer != InvalidBuffer)
  253.         ReleaseBuffer(buffer);
  254.  
  255.     /* get next buffer */
  256.     buffer = ReadBuffer(r, blk);
  257.     page = (Page) BufferGetPage(buffer, 0);
  258.  
  259.     opaque = (RTreePageOpaque) PageGetSpecialPointer(page);
  260.     if (!(opaque->flags & F_LEAF)) {
  261.         RTSTACK *n;
  262.         ItemId iid;
  263.  
  264.         n = (RTSTACK *) palloc(sizeof(RTSTACK));
  265.         n->rts_parent = stack;
  266.         n->rts_blk = blk;
  267.         n->rts_child = choose(r, page, itup);
  268.         stack = n;
  269.  
  270.         iid = PageGetItemId(page, n->rts_child);
  271.         which = (IndexTuple) PageGetItem(page, iid);
  272.         blk = ItemPointerGetBlockNumber(&(which->t_tid));
  273.     }
  274.     } while (!(opaque->flags & F_LEAF));
  275.  
  276.     if (nospace(page, itup)) {
  277.     /* need to do a split */
  278.     res = dosplit(r, buffer, stack, itup);
  279.     freestack(stack);
  280.     WriteBuffer(buffer);  /* don't forget to release buffer! */
  281.     return (res);
  282.     }
  283.  
  284.     /* add the item and write the buffer */
  285.     if (PageIsEmpty(page))
  286.     PageAddItem(page, (Item) itup, IndexTupleSize(itup), 1, LP_USED);
  287.     else
  288.     PageAddItem(page, (Item) itup, IndexTupleSize(itup),
  289.             PageGetMaxOffsetIndex(page) + 2, LP_USED);
  290.  
  291.     WriteBuffer(buffer);
  292.  
  293.     datum = (((char *) itup) + sizeof(IndexTupleData));
  294.  
  295.     /* now expand the page boundary in the parent to include the new child */
  296.     rttighten(r, stack, datum, (IndexTupleSize(itup) - sizeof(IndexTupleData)));
  297.     freestack(stack);
  298.  
  299.     /* build and return an InsertIndexResult for this insertion */
  300.     res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
  301.     ItemPointerSet(&(res->pointerData), 0, blk, 0, l);
  302.     res->lock = (RuleLock) NULL;
  303.     res->offset = (double) 0;
  304.  
  305.     return (res);
  306. }
  307.  
  308. rttighten(r, stk, datum, att_size)
  309.     Relation r;
  310.     RTSTACK *stk;
  311.     char *datum;
  312.     int att_size;
  313. {
  314.     char *oldud;
  315.     Page p;
  316.     int old_size, newd_size;
  317.     RegProcedure union_proc, size_proc;
  318.     Buffer b;
  319.  
  320.     if (stk == (RTSTACK *) NULL)
  321.     return;
  322.  
  323.     b = ReadBuffer(r, stk->rts_blk);
  324.     p = BufferGetPage(b, 0);
  325.  
  326.     union_proc = index_getprocid(r, 1, RT_UNION_PROC);
  327.     size_proc = index_getprocid(r, 1, RT_SIZE_PROC);
  328.  
  329.     oldud = (char *) PageGetItem(p, PageGetItemId(p, stk->rts_child));
  330.     oldud += sizeof(IndexTupleData);
  331.  
  332.     old_size = (int) fmgr(size_proc, oldud);
  333.     datum = (char *) fmgr(union_proc, oldud, datum);
  334.  
  335.     newd_size = (int) fmgr(size_proc, datum);
  336.  
  337.     /* XXX assume constant-size data items here */
  338.     if (newd_size != old_size) {
  339.     bcopy(datum, oldud, att_size);
  340.     WriteBuffer(b);
  341.     rttighten(r, stk->rts_parent, datum, att_size);
  342.     } else {
  343.     ReleaseBuffer(b);
  344.     }
  345.     pfree(datum);
  346. }
  347.  
  348. /*
  349.  *  dosplit -- split a page in the tree.
  350.  *
  351.  *    This is the quadratic-cost split algorithm Guttman describes in
  352.  *    his paper.  The reason we chose it is that you can implement this
  353.  *    with less information about the data types on which you're operating.
  354.  */
  355.  
  356. InsertIndexResult
  357. dosplit(r, buffer, stack, itup)
  358.     Relation r;
  359.     Buffer buffer;
  360.     RTSTACK *stack;
  361.     IndexTuple itup;
  362. {
  363.     Page p;
  364.     Buffer leftbuf, rightbuf;
  365.     Page left, right;
  366.     ItemId itemid;
  367.     IndexTuple item;
  368.     IndexTuple ltup, rtup;
  369.     OffsetNumber maxoff;
  370.     OffsetIndex i;
  371.     OffsetIndex leftoff, rightoff;
  372.     BlockNumber lbknum, rbknum;
  373.     BlockNumber bufblock;
  374.     RTreePageOpaque opaque;
  375.     int blank;
  376.     InsertIndexResult res;
  377.     char *isnull;
  378.     SPLITVEC v;
  379.  
  380.     isnull = (char *) palloc(r->rd_rel->relnatts);
  381.     for (blank = 0; blank < r->rd_rel->relnatts; blank++)
  382.     isnull[blank] = ' ';
  383.     p = (Page) BufferGetPage(buffer, 0);
  384.     opaque = (RTreePageOpaque) PageGetSpecialPointer(p);
  385.  
  386.     /*
  387.      *  The root of the tree is the first block in the relation.  If
  388.      *  we're about to split the root, we need to do some hocus-pocus
  389.      *  to enforce this guarantee.
  390.      */
  391.  
  392.     if (BufferGetBlockNumber(buffer) == P_ROOT) {
  393.     leftbuf = ReadBuffer(r, P_NEW);
  394.     RTInitBuffer(leftbuf, opaque->flags);
  395.     lbknum = BufferGetBlockNumber(leftbuf);
  396.     left = (Page) BufferGetPage(leftbuf, 0);
  397.     } else {
  398.     leftbuf = buffer;
  399.     IncrBufferRefCount(buffer);
  400.     lbknum = BufferGetBlockNumber(buffer);
  401.     left = (Page) PageGetTempPage(p, sizeof(RTreePageOpaqueData));
  402.     }
  403.  
  404.     rightbuf = ReadBuffer(r, P_NEW);
  405.     RTInitBuffer(rightbuf, opaque->flags);
  406.     rbknum = BufferGetBlockNumber(rightbuf);
  407.     right = (Page) BufferGetPage(rightbuf, 0);
  408.  
  409.     picksplit(r, p, &v, itup);
  410.  
  411.     leftoff = rightoff = 0;
  412.     maxoff = PageGetMaxOffsetIndex(p);
  413.     for (i = 0; i <= maxoff; i++) {
  414.     char *dp;
  415.  
  416.     itemid = PageGetItemId(p, i);
  417.     item = (IndexTuple) PageGetItem(p, itemid);
  418.  
  419.     if (i == *(v.spl_left)) {
  420.         (void) PageAddItem(left, (Item) item, IndexTupleSize(item),
  421.                    leftoff++, LP_USED);
  422.         v.spl_left++;
  423.     } else {
  424.         (void) PageAddItem(right, (Item) item, IndexTupleSize(item),
  425.                    rightoff++, LP_USED);
  426.         v.spl_right++;
  427.     }
  428.     }
  429.  
  430.     /* build an InsertIndexResult for this insertion */
  431.     res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
  432.     res->lock = (RuleLock) NULL;
  433.     res->offset = (double) 0;
  434.  
  435.     /* now insert the new index tuple */
  436.     if (*(v.spl_left) != (OffsetNumber) 0) {
  437.     (void) PageAddItem(left, (Item) itup, IndexTupleSize(itup),
  438.                leftoff++, LP_USED);
  439.     ItemPointerSet(&(res->pointerData), 0, lbknum, 0, leftoff);
  440.     } else {
  441.     (void) PageAddItem(right, (Item) itup, IndexTupleSize(itup),
  442.                rightoff++, LP_USED);
  443.     ItemPointerSet(&(res->pointerData), 0, rbknum, 0, rightoff);
  444.     }
  445.  
  446.     if ((bufblock = BufferGetBlockNumber(buffer)) != P_ROOT) {
  447.     PageRestoreTempPage(left, p);
  448.     }
  449.     WriteBuffer(leftbuf);
  450.     WriteBuffer(rightbuf);
  451.  
  452.     /*
  453.      *  Okay, the page is split.  We have three things left to do:
  454.      *
  455.      *    1)  Adjust any active scans on this index to cope with changes
  456.      *        we introduced in its structure by splitting this page.
  457.      *
  458.      *    2)  "Tighten" the bounding box of the pointer to the left
  459.      *          page in the parent node in the tree, if any.  Since we
  460.      *          moved a bunch of stuff off the left page, we expect it
  461.      *          to get smaller.  This happens in the internal insertion
  462.      *        routine.
  463.      *
  464.      *    3)  Insert a pointer to the right page in the parent.  This
  465.      *          may cause the parent to split.  If it does, we need to
  466.      *          repeat steps one and two for each split node in the tree.
  467.      */
  468.  
  469.     /* adjust active scans */
  470.     rtadjscans(r, RTOP_SPLIT, bufblock, 0);
  471.  
  472.     ltup = (IndexTuple) index_formtuple(r->rd_rel->relnatts,
  473.                    (TupleDescriptor) (&r->rd_att.data[0]),
  474.                        (Datum *) &(v.spl_ldatum), isnull);
  475.     rtup = (IndexTuple) index_formtuple(r->rd_rel->relnatts,
  476.                    (TupleDescriptor) &r->rd_att.data[0],
  477.                    (Datum *) &(v.spl_rdatum), isnull);
  478.     pfree (isnull);
  479.  
  480.     /* set pointers to new child pages in the internal index tuples */
  481.     ItemPointerSet(&(ltup->t_tid), 0, lbknum, 0, 1);
  482.     ItemPointerSet(&(rtup->t_tid), 0, rbknum, 0, 1);
  483.  
  484.     rtintinsert(r, stack, ltup, rtup);
  485.  
  486.     pfree(ltup);
  487.     pfree(rtup);
  488.  
  489.     return (res);
  490. }
  491.  
  492. rtintinsert(r, stk, ltup, rtup)
  493.     Relation r;
  494.     RTSTACK *stk;
  495.     IndexTuple ltup;
  496.     IndexTuple rtup;
  497. {
  498.     IndexTuple old;
  499.     IndexTuple it;
  500.     Buffer b;
  501.     Page p;
  502.     RegProcedure union_proc, size_proc;
  503.     char *ldatum, *rdatum, *newdatum;
  504.     InsertIndexResult res;
  505.  
  506.     if (stk == (RTSTACK *) NULL) {
  507.     rtnewroot(r, ltup, rtup);
  508.     return;
  509.     }
  510.  
  511.     union_proc = index_getprocid(r, 1, RT_UNION_PROC);
  512.     b = ReadBuffer(r, stk->rts_blk);
  513.     p = BufferGetPage(b, 0);
  514.     old = (IndexTuple) PageGetItem(p, PageGetItemId(p, stk->rts_child));
  515.  
  516.     /*
  517.      *  This is a hack.  Right now, we force rtree keys to be constant size.
  518.      *  To fix this, need delete the old key and add both left and right
  519.      *  for the two new pages.  The insertion of left may force a split if
  520.      *  the new left key is bigger than the old key.
  521.      */
  522.  
  523.     if (IndexTupleSize(old) != IndexTupleSize(ltup))
  524.     elog(WARN, "Variable-length rtree keys are not supported.");
  525.  
  526.     /* install pointer to left child */
  527.     bcopy(ltup, old, IndexTupleSize(ltup));
  528.  
  529.     if (nospace(p, rtup)) {
  530.     newdatum = (((char *) ltup) + sizeof(IndexTupleData));
  531.     rttighten(r, stk->rts_parent, newdatum,
  532.           (IndexTupleSize(ltup) - sizeof(IndexTupleData)));
  533.     res = dosplit(r, b, stk->rts_parent, rtup);
  534.     pfree (res);
  535.     } else {
  536.     PageAddItem(p, (Item) rtup, IndexTupleSize(rtup),
  537.             PageGetMaxOffsetIndex(p), LP_USED);
  538.     WriteBuffer(b);
  539.     ldatum = (((char *) ltup) + sizeof(IndexTupleData));
  540.     rdatum = (((char *) rtup) + sizeof(IndexTupleData));
  541.     newdatum = (char *) fmgr(union_proc, ldatum, rdatum);
  542.  
  543.     rttighten(r, stk->rts_parent, newdatum,
  544.           (IndexTupleSize(rtup) - sizeof(IndexTupleData)));
  545.  
  546.     pfree(newdatum);
  547.     }
  548. }
  549.  
  550. rtnewroot(r, lt, rt)
  551.     Relation r;
  552.     IndexTuple lt;
  553.     IndexTuple rt;
  554. {
  555.     Buffer b;
  556.     Page p;
  557.  
  558.     b = ReadBuffer(r, P_ROOT);
  559.     RTInitBuffer(b, 0);
  560.     p = BufferGetPage(b, 0);
  561.     PageAddItem(p, (Item) lt, IndexTupleSize(lt), 0, LP_USED);
  562.     PageAddItem(p, (Item) rt, IndexTupleSize(rt), 1, LP_USED);
  563.     WriteBuffer(b);
  564. }
  565.  
  566. picksplit(r, page, v, itup)
  567.     Relation r;
  568.     Page page;
  569.     SPLITVEC *v;
  570.     IndexTuple itup;
  571. {
  572.     OffsetNumber maxoff;
  573.     OffsetNumber i, j;
  574.     IndexTuple item_1, item_2;
  575.     char *datum_alpha, *datum_beta;
  576.     char *datum_l, *datum_r;
  577.     char *union_d, *union_dl, *union_dr;
  578.     char *inter_d;
  579.     RegProcedure union_proc;
  580.     RegProcedure size_proc;
  581.     RegProcedure inter_proc;
  582.     bool firsttime;
  583.     int waste;
  584.     int size_alpha, size_beta, size_union, size_waste, size_inter;
  585.     int size_l, size_r;
  586.     int nbytes;
  587.     OffsetNumber seed_1, seed_2;
  588.     OffsetNumber *left, *right;
  589.  
  590.     union_proc = index_getprocid(r, 1, RT_UNION_PROC);
  591.     size_proc = index_getprocid(r, 1, RT_SIZE_PROC);
  592.     inter_proc = index_getprocid(r, 1, RT_INTER_PROC);
  593.     maxoff = PageGetMaxOffsetIndex(page);
  594.  
  595.     nbytes = (maxoff + 3) * sizeof(OffsetNumber);
  596.     v->spl_left = (OffsetNumber *) palloc(nbytes);
  597.     v->spl_right = (OffsetNumber *) palloc(nbytes);
  598.  
  599.     firsttime = true;
  600.     waste = 0;
  601.  
  602.     for (i = 0; i < maxoff; i++) {
  603.     item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
  604.     datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
  605.     for (j = i + 1; j <= maxoff; j++) {
  606.         item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, j));
  607.         datum_beta = ((char *) item_2) + sizeof(IndexTupleData);
  608.  
  609.         /* compute the wasted space by unioning these guys */
  610.         union_d = (char *) fmgr(union_proc, datum_alpha, datum_beta);
  611.         size_union = (int) fmgr(size_proc, union_d);
  612.         inter_d = (char *) fmgr(inter_proc, datum_alpha, datum_beta);
  613.         size_inter = (int) fmgr(size_proc, inter_d);
  614.         size_waste = size_union - size_inter;
  615.  
  616.         pfree(union_d);
  617.         if (inter_d != (char *) NULL)
  618.             pfree(inter_d);
  619.  
  620.         /*
  621.          *  are these a more promising split that what we've
  622.          *  already seen?
  623.          */
  624.  
  625.         if (size_waste > waste || firsttime) {
  626.         waste = size_waste;
  627.         seed_1 = i;
  628.         seed_2 = j;
  629.         firsttime = false;
  630.         }
  631.     }
  632.     }
  633.  
  634.     left = v->spl_left;
  635.     v->spl_nleft = 0;
  636.     right = v->spl_right;
  637.     v->spl_nright = 0;
  638.  
  639.     item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_1));
  640.     datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
  641.     datum_l = (char *) fmgr(union_proc, datum_alpha, datum_alpha);
  642.     size_l = (int) fmgr(size_proc, datum_l);
  643.     item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2));
  644.     datum_beta = ((char *) item_2) + sizeof(IndexTupleData);
  645.     datum_r = (char *) fmgr(union_proc, datum_beta, datum_beta);
  646.     size_r = (int) fmgr(size_proc, datum_r);
  647.  
  648.     /*
  649.      *  Now split up the regions between the two seeds.  An important
  650.      *  property of this split algorithm is that the split vector v
  651.      *  has the indices of items to be split in order in its left and
  652.      *  right vectors.  We exploit this property by doing a merge in
  653.      *  the code that actually splits the page.
  654.      *
  655.      *  For efficiency, we also place the new index tuple in this loop.
  656.      *  This is handled at the very end, when we have placed all the
  657.      *  existing tuples and i == maxoff + 1.
  658.      */
  659.  
  660.     maxoff++;
  661.     for (i = 0; i <= maxoff; i++) {
  662.  
  663.     /*
  664.      *  If we've already decided where to place this item, just
  665.      *  put it on the right list.  Otherwise, we need to figure
  666.      *  out which page needs the least enlargement in order to
  667.      *  store the item.
  668.      */
  669.  
  670.     if (i == seed_1) {
  671.         *left++ = i;
  672.         v->spl_nleft++;
  673.         continue;
  674.     } else if (i == seed_2) {
  675.         *right++ = i;
  676.         v->spl_nright++;
  677.         continue;
  678.     }
  679.  
  680.     /* okay, which page needs least enlargement? */ 
  681.     if (i == maxoff)
  682.         item_1 = itup;
  683.     else
  684.         item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
  685.  
  686.     datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
  687.     union_dl = (char *) fmgr(union_proc, datum_l, datum_alpha);
  688.     union_dr = (char *) fmgr(union_proc, datum_r, datum_alpha);
  689.     size_alpha = (int) fmgr(size_proc, union_dl);
  690.     size_beta = (int) fmgr(size_proc, union_dr);
  691.  
  692.     /* pick which page to add it to */
  693.     if (size_alpha - size_l < size_beta - size_r) {
  694.         pfree(datum_l);
  695.         pfree(union_dr);
  696.         datum_l = union_dl;
  697.         size_l = size_alpha;
  698.         *left++ = i;
  699.         v->spl_nleft++;
  700.     } else {
  701.         pfree(datum_r);
  702.         pfree(union_dl);
  703.         datum_r = union_dr;
  704.         size_r = size_alpha;
  705.         *right++ = i;
  706.         v->spl_nright++;
  707.     }
  708.     }
  709.     *left = *right = (OffsetNumber)0;
  710.  
  711.     v->spl_ldatum = datum_l;
  712.     v->spl_rdatum = datum_r;
  713. }
  714.  
  715. RTInitBuffer(b, f)
  716.     Buffer b;
  717.     uint32 f;
  718. {
  719.     RTreePageOpaque opaque;
  720.     Page page;
  721.     PageSize pageSize;
  722.  
  723.     pageSize = BufferGetBlockSize(b) / PagePartitionGetPagesPerBlock(0);
  724.     BufferInitPage(b, pageSize);
  725.  
  726.     page = BufferGetPage(b, FirstPageNumber);
  727.     bzero(page, (int) pageSize);
  728.     PageInit(page, pageSize, sizeof(RTreePageOpaqueData));
  729.  
  730.     opaque = (RTreePageOpaque) PageGetSpecialPointer(page);
  731.     opaque->flags = f;
  732. }
  733.  
  734. choose(r, p, it)
  735.     Relation r;
  736.     Page p;
  737.     IndexTuple it;
  738. {
  739.     int maxoff;
  740.     int i;
  741.     char *ud, *id;
  742.     char *datum;
  743.     int isize, usize, dsize;
  744.     int which, which_grow;
  745.     RegProcedure union_proc, size_proc;
  746.  
  747.     union_proc = index_getprocid(r, 1, RT_UNION_PROC);
  748.     size_proc = index_getprocid(r, 1, RT_SIZE_PROC);
  749.     id = ((char *) it) + sizeof(IndexTupleData);
  750.     isize = (int) fmgr(size_proc, id);
  751.     maxoff = PageGetMaxOffsetIndex(p);
  752.     which_grow = -1;
  753.     which = -1;
  754.  
  755.     for (i = 0; i <= maxoff; i++) {
  756.     datum = (char *) PageGetItem(p, PageGetItemId(p, i));
  757.     datum += sizeof(IndexTupleData);
  758.     dsize = (int) fmgr(size_proc, datum);
  759.     ud = (char *) fmgr(union_proc, datum, id);
  760.     usize = (int) fmgr(size_proc, ud);
  761.     pfree(ud);
  762.     if (which_grow < 0 || usize - dsize < which_grow) {
  763.         which = i;
  764.         which_grow = usize - dsize;
  765.         if (which_grow == 0)
  766.         break;
  767.     }
  768.     }
  769.     return (which);
  770. }
  771.  
  772. int
  773. nospace(p, it)
  774.     Page p;
  775.     IndexTuple it;
  776. {
  777.     return (PageGetFreeSpace(p) < IndexTupleSize(it));
  778. }
  779.  
  780. freestack(s)
  781.     RTSTACK *s;
  782. {
  783.     RTSTACK *p;
  784.  
  785.     while (s != (RTSTACK *) NULL) {
  786.     p = s->rts_parent;
  787.     pfree (s);
  788.     s = p;
  789.     }
  790. }
  791.  
  792. char *
  793. rtdelete(r, tid)
  794.     Relation r;
  795.     ItemPointer tid;
  796. {
  797.     BlockNumber blkno;
  798.     OffsetIndex offind;
  799.     Buffer buf;
  800.     Page page;
  801.  
  802.     /* must write-lock on delete */
  803.     RelationSetLockForWrite(r);
  804.  
  805.     blkno = ItemPointerGetBlockNumber(tid);
  806.     offind = ItemPointerGetOffsetNumber(tid, 0) - 1;
  807.  
  808.     /* adjust any scans that will be affected by this deletion */
  809.     rtadjscans(r, RTOP_DEL, blkno, offind);
  810.  
  811.     /* delete the index tuple */
  812.     buf = ReadBuffer(r, blkno);
  813.     page = BufferGetPage(buf, 0);
  814.  
  815.     PageIndexTupleDelete(page, offind + 1);
  816.  
  817.     WriteBuffer(buf);
  818.  
  819.     /* XXX -- two-phase locking, don't release the write lock */
  820.     return ((char *) NULL);
  821. }
  822.  
  823. #define RTDEBUG
  824. #ifdef RTDEBUG
  825.  
  826. _rtdump(r)
  827.     Relation r;
  828. {
  829.     Buffer buf;
  830.     Page page;
  831.     OffsetIndex offind, maxoff;
  832.     BlockNumber blkno;
  833.     BlockNumber nblocks;
  834.     RTreePageOpaque po;
  835.     IndexTuple itup;
  836.     BlockNumber itblkno;
  837.     PageNumber itpgno;
  838.     OffsetNumber itoffno;
  839.     char *datum;
  840.     char *itkey;
  841.  
  842.     nblocks = RelationGetNumberOfBlocks(r);
  843.     for (blkno = 0; blkno < nblocks; blkno++) {
  844.     buf = ReadBuffer(r, blkno);
  845.     page = BufferGetPage(buf, 0);
  846.     po = (RTreePageOpaque) PageGetSpecialPointer(page);
  847.     maxoff = PageGetMaxOffsetIndex(page);
  848.     printf("Page %d maxoff %d <%s>\n", blkno, maxoff,
  849.         (po->flags & F_LEAF ? "LEAF" : "INTERNAL"));
  850.  
  851.     if (PageIsEmpty(page)) {
  852.         ReleaseBuffer(buf);
  853.         continue;
  854.     }
  855.  
  856.     for (offind = 0; offind <= maxoff; offind++) {
  857.         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offind));
  858.         itblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
  859.         itpgno = ItemPointerGetPageNumber(&(itup->t_tid), 0);
  860.         itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid), 0);
  861.         datum = ((char *) itup);
  862.         datum += sizeof(IndexTupleData);
  863.         itkey = (char *) box_out(datum);
  864.         printf("\t[%d] size %d heap <%d,%d,%d> key:%s\n",
  865.            offind + 1, IndexTupleSize(itup), itblkno, itpgno,
  866.            itoffno, itkey);
  867.         pfree(itkey);
  868.     }
  869.  
  870.     ReleaseBuffer(buf);
  871.     }
  872. }
  873. #endif /* defined RTDEBUG */
  874.